/*
 * Copyright 2019-2021 the original author or authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cool.houge.ws;

import com.google.common.annotations.VisibleForTesting;
import cool.houge.ws.packet.ErrorPacket;
import cool.houge.ws.packet.Packet;
import cool.houge.ws.session.Session;
import cool.houge.ws.session.SessionGroupManager;
import cool.houge.ws.session.SessionManager;
import io.avaje.jsonb.JsonException;
import io.avaje.jsonb.Jsonb;
import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.InputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

/**
 * 消息处理器.
 *
 * @author KK (kzou227@qq.com)
 */
@Singleton
public class WebsocketHandler {

  private static final Logger log = LogManager.getLogger();

  @Inject SessionManager sessionManager;
  @Inject SessionGroupManager sessionGroupManager;

  @Inject AuthHandler authHandler;
  @Inject PacketHandler packetHandler;
  @Inject ErrorHandler errorHandler;

  // JSON序列化
  private static final Jsonb JSONB = Jsonb.builder().build();

  /**
   * @param inbound
   * @param outbound
   * @return
   */
  public Mono<Void> handle(WebsocketInbound inbound, WebsocketOutbound outbound) {
    // 1. 会话认证
    // 2. 接收WebSocket消息
    return Mono.defer(() -> authHandler.handle(inbound, outbound))
        .doOnSuccess(session -> receiveFrames(inbound, outbound, session))
        .then(outbound.neverComplete())
        .onErrorResume(
            t -> {
              log.error("未处理的异常", t);
              return outbound.sendClose();
            });
  }

  @VisibleForTesting
  void receiveFrames(WebsocketInbound inbound, WebsocketOutbound outbound, Session session) {
    Flux.defer(() -> inbound.aggregateFrames().receiveFrames())
        .doFinally(
            signalType -> {
              log.info("会话中止 signType={} {}", signalType, session);
              // 删除会话管理器中的 Session
              sessionManager.remove(session).subscribe();
              // 删除群组会话管理器中的 Session
              sessionGroupManager.unsubGroups(session, session.subGroupIds()).subscribe();
            })
        .doOnError(
            ex -> {
              log.error("未处理的异常 session={}", session, ex);
              outbound.sendClose().subscribe();
            })
        .subscribe(
            frame -> {
              // 处理WebSocket消息
              processFrame(frame, session);
            });
  }

  @VisibleForTesting
  void processFrame(WebSocketFrame frame, Session session) {
    InputStream in = new ByteBufInputStream(frame.content());
    Packet packet;
    try {
      packet = JSONB.type(Packet.class).fromJson(in);
    } catch (JsonException e) {
      log.debug("非法的请求包 session={}", session, e);
      var ep = new ErrorPacket().setCode(ErrorCodes.PACKET).setMessage("非法的请求包");
      session.send(ep).subscribe();
      return;
    }

    Mono.defer(
            () -> {
              log.debug("处理消息包 session={} packet={}", session, packet);
              return packetHandler.handle(session, packet);
            })
        .onErrorResume(
            t -> {
              // 错误处理
              return errorHandler.handle(session, t);
            })
        .subscribe();
  }
}
